package com.chj.producer_balance.mandatory;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者——失败确认模式
 */
public class ProducerMandatory {
    public final static String EXCHANGE_NAME = "mandatory_test";
    public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.200.169.5");
        connectionFactory.setVirtualHost("chj_vhost");
        connectionFactory.setUsername("chj");
        connectionFactory.setPassword("123456");
        // 打开连接和创建频道，与发送端一样
        Connection connection = connectionFactory.newConnection();
        final Channel channel = connection.createChannel();
        // 指定Direct交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //TODO  回调 连接关闭时执行
        connection.addShutdownListener(new ShutdownListener() {
            public void shutdownCompleted(ShutdownSignalException cause) {
                System.out.println("connection was shutdown:"+cause.getMessage()+ System.currentTimeMillis());
            }
        });
        //TODO 回调 信道关闭时执行
        channel.addShutdownListener(new ShutdownListener() {
            public void shutdownCompleted(ShutdownSignalException cause) {
                System.out.println("channel was shutdown:"+cause.getMessage()+ System.currentTimeMillis());
            }
        });
        //TODO 失败通知 回调
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replycode, String replyText, String exchange, String routeKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                String message = new String(bytes);
                System.out.println("返回的replycode:"+replycode);
                System.out.println("返回的replyText:"+replyText);
                System.out.println("返回的exchange:"+exchange);
                System.out.println("返回的routeKey:"+routeKey);
            }
        });
        String[] routekeys={"hankin01","hankin02","hankin03"};
        for(int i=0;i<3;i++) {
            String routekey = routekeys[i % 3];
            // 发送的消息
            String message = "Hello World_" + (i + 1)+ ("_" + System.currentTimeMillis());
            //  TODO 消息确认
            channel.basicPublish(EXCHANGE_NAME,routekey,true,null,message.getBytes());
            System.out.println("----------------------------------");
            System.out.println(" Sent Message: [" + routekey +"]:'" + message + "'");
            Thread.sleep(200);
        }
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}
